Flask

您所在的位置:网站首页 flask sqlalchemy查询返回什么数据类型的结果 Flask

Flask

2023-11-20 03:44| 来源: 网络整理| 查看: 265

Flask-SQLAlchemy

SQLAlchemy是一个关系型数据库框架,它提供了高层的 ORM 和底层的原生数据库的操作。flask-sqlalchemy 是一个简化了 SQLAlchemy 操作的flask扩展。

文档地址:http://docs.jinkan.org/docs/flask-sqlalchemy

1、安装及设置 # 安装 flask-sqlalchemy pip install flask-sqlalchemy # 如果连接的是 mysql 数据库,需要安装 mysqldb pip install flask-mysqldb 2、数据库连接设置 在 Flask-SQLAlchemy 中,数据库使用URL指定,而且程序使用的数据库必须保存到Flask配置对象的 SQLALCHEMY_DATABASE_URI 键中 其他数据库见文档:https://docs.sqlalchemy.org/en/13/core/engines.html app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:[email protected]:3306/test' # oracle://scott:[email protected]:1521/test # mysql://scott:tiger@localhost/mydatabase # postgresql://scott:tiger@localhost/mydatabase # sqlite:////absolute/path/to/foo.db 注意开头四个斜杠 # 动态追踪修改设置,如未设置只会提示警告 app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True #查询时会显示原始SQL语句 app.config['SQLALCHEMY_ECHO'] = True 其他配置 名字备注 SQLALCHEMY_DATABASE_URI 用于连接的数据库 URI 。例如:sqlite:////tmp/test.dbmysql://username:password@server/db SQLALCHEMY_BINDS 一个映射 binds 到连接 URI 的字典。 SQLALCHEMY_ECHO 如果设置为Ture, SQLAlchemy 会记录所有 发给 stderr 的语句,这对调试有用。(打印sql语句) SQLALCHEMY_RECORD_QUERIES 可以用于显式地禁用或启用查询记录。查询记录 在调试或测试模式自动启用。更多信息见get_debug_queries()。 SQLALCHEMY_NATIVE_UNICODE 可以用于显式禁用原生 unicode 支持。当使用 不合适的指定无编码的数据库默认值时,这对于 一些数据库适配器是必须的(比如 Ubuntu 上 某些版本的 PostgreSQL )。 SQLALCHEMY_POOL_SIZE 数据库连接池的大小。默认是引擎默认值(通常 是 5 ) SQLALCHEMY_POOL_TIMEOUT 设定连接池的连接超时时间。默认是 10 。 SQLALCHEMY_POOL_RECYCLE 多少秒后自动回收连接。这对 MySQL 是必要的, 它默认移除闲置多于 8 小时的连接。注意如果 使用了 MySQL , Flask-SQLALchemy 自动设定 这个值为 2 小时。 3、常用的SQLAlchemy字段类型 类型名python中类型说明 Integer int 普通整数,一般是32位 SmallInteger int 取值范围小的整数,一般是16位 BigInteger int或long 不限制精度的整数 Float float 浮点数 Numeric decimal.Decimal 普通整数,一般是32位 String str 变长字符串 Text str 变长字符串,对较长或不限长度的字符串做了优化 Unicode unicode 变长Unicode字符串 UnicodeText unicode 变长Unicode字符串,对较长或不限长度的字符串做了优化 Boolean bool 布尔值 Date datetime.date 时间 Time datetime.datetime 日期和时间 LargeBinary str 二进制文件 4、常用的SQLAlchemy列选项 选项名说明 primary_key 如果为True,代表表的主键 unique 如果为True,代表这列不允许出现重复的值 index 如果为True,为这列创建索引,提高查询效率 nullable 如果为True,允许有空值,如果为False,不允许有空值 default 为这列定义默认值 5、常用的SQLAlchemy关系选项 选项名说明 backref 在关系的另一模型中添加反向引用 primary join 明确指定两个模型之间使用的联结条件 uselist 如果为False,不使用列表,而使用标量值 order_by 指定关系中记录的排序方式 secondary 指定多对多关系中关系表的名字 secondary join 在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件 6、数据库基本操作

在Flask-SQLAlchemy中,插入、修改、删除操作,均由数据库会话管理。

会话用 db.session 表示。在准备把数据写入数据库前,要先将数据添加到会话中然后调用 commit() 方法提交会话。

在 Flask-SQLAlchemy 中,查询操作是通过 query 对象操作数据。

最基本的查询是返回表中所有数据,可以通过过滤器进行更精确的数据库查询。 6.1在视图函数中定义模型类(定义之前需在数据库创建test数据库) from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) #设置连接数据库的URL app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:[email protected]:3306/test' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True #查询时会显示原始SQL语句 app.config['SQLALCHEMY_ECHO'] = True db = SQLAlchemy(app) class Role(db.Model): # 定义表名 __tablename__ = 'roles' # 定义列对象 id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True) us = db.relationship('User', backref='role') #repr()方法显示一个可读字符串 def __repr__(self): return 'Role:%s'% self.name class User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True, index=True) email = db.Column(db.String(64),unique=True) password = db.Column(db.String(64)) role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) def __repr__(self): return 'User:%s'%self.name if __name__ == '__main__': app.run(debug=True) 6.2模型之间的关联

一对多

class Role(db.Model): ... #关键代码 us = db.relationship('User', backref='role', lazy='dynamic') ... class User(db.Model): ... role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) 其中realtionship描述了Role和User的关系。在此文中,第一个参数为对应参照的类"User" 第二个参数backref为类User申明新属性的方法 第三个参数lazy决定了什么时候SQLALchemy从数据库中加载数据 如果设置为子查询方式(subquery),则会在加载完Role对象后,就立即加载与其关联的对象,这样会让总查询数量减少,但如果返回的条目数量很多,就会比较慢 设置为 subquery 的话,role.users 返回所有数据列表 另外,也可以设置为动态方式(dynamic),这样关联对象会在被使用的时候再进行加载,并且在返回前进行过滤,如果返回的对象数很多,或者未来会变得很多,那最好采用这种方式 设置为 dynamic 的话,role.users 返回查询对象,并没有做真正的查询,可以利用查询对象做其他逻辑,比如:先排序再返回结果 多对多 registrations = db.Table('registrations', db.Column('student_id', db.Integer, db.ForeignKey('students.id')), db.Column('course_id', db.Integer, db.ForeignKey('courses.id')) ) class Course(db.Model): ... class Student(db.Model): ... courses = db.relationship('Course',secondary=registrations, backref='students', lazy='dynamic') 6.3 常用的SQLAlchemy查询过滤器 过滤器说明 filter() 把过滤器添加到原查询上,返回一个新查询 filter_by() 把等值过滤器添加到原查询上,返回一个新查询 limit 使用指定的值限定原查询返回的结果 offset() 偏移原查询返回的结果,返回一个新查询 order_by() 根据指定条件对原查询结果进行排序,返回一个新查询 group_by() 根据指定条件对原查询结果进行分组,返回一个新查询 6.4 常用的SQLAlchemy查询执行器 方法说明 all() 以列表形式返回查询的所有结果 first() 返回查询的第一个结果,如果未查到,返回None first_or_404() 返回查询的第一个结果,如果未查到,返回404 get() 返回指定主键对应的行,如不存在,返回None get_or_404() 返回指定主键对应的行,如不存在,返回404 count() 返回查询结果的数量 paginate() 返回一个Paginate对象,它包含指定范围内的结果 6.5实例测试 # 创建表: db.create_all() # 删除表 db.drop_all() # 插入一条数据 ro1 = Role(name='admin') db.session.add(ro1) db.session.commit() #再次插入一条数据 ro2 = Role(name='user') db.session.add(ro2) db.session.commit() # 一次插入多条数据 us1 = User(name='wang',email='[email protected]',password='123456',role_id=ro1.id) us2 = User(name='zhang',email='[email protected]',password='201512',role_id=ro2.id) us3 = User(name='chen',email='[email protected]',password='987654',role_id=ro2.id) db.session.add_all([us1,us2,us3]) db.session.commit() # 查询:filter_by精确查询 # 返回名字等于wang的所有人 User.query.filter_by(name='wang').all() # first()返回查询到的第一个对象 User.query.first() # filter模糊查询,返回名字结尾字符为g的所有数据。 User.query.filter(User.name.endswith('g')).all() # get():参数为主键,如果主键不存在没有返回内容 User.query.get() # 逻辑非,返回名字不等于wang的所有数据 User.query.filter(User.name!='wang').all() # not_ 相当于取反 from sqlalchemy import not_ User.query.filter(not_(User.name=='chen')).all() #逻辑与,需要导入and,返回and()条件满足的所有数据 from sqlalchemy import and_ User.query.filter(and_(User.name!='wang',User.email.endswith('163.com'))).all() # 逻辑或,需要导入or_ from sqlalchemy import or_ User.query.filter(or_(User.name!='wang',User.email.endswith('163.com'))).all() 查询数据后删除 user = User.query.first() db.session.delete(user) db.session.commit() User.query.all() 更新数据 user = User.query.first() user.name = 'dong' db.session.commit() User.query.first() # 关联查询示例: # 角色和用户的关系是一对多的关系,一个角色可以有多个用户,一个用户只能属于一个角色。 ## 查询角色的所有用户 #查询roles表id为1的角色 ro1 = Role.query.get(1) #查询该角色的所有用户 ro1.us.all() ##查询用户所属角色 #查询users表id为3的用户 us1 = User.query.get(3) #查询用户属于什么角色 us1.role 6.6数据显示&表单添加&表单验证&删除数据 (1)数据显示

定义路由函数,并将 Author 和 Book 的所有结果传到模板

@app.route('/',methods=['GET','POST']) def index(): author = Author.query.all() book = Book.query.all() return render_template('index.html',author=author,book=book)

模版关键代码

{% for x in author %} {{ x }} {% endfor %} {% for x in book %} {{ x }} {% endfor %} (2)表单添加

定义表单类

from flask_wtf import FlaskForm from wtforms.validators import DataRequired from wtforms import StringField,SubmitField #创建表单类,用来添加信息 class Append(FlaskForm): au_info = StringField(validators=[DataRequired()]) bk_info = StringField(validators=[DataRequired()]) submit = SubmitField(u'添加')

传入至模版中

#创建表单对象 @app.route('/',methods=['GET','POST']) def index(): author = Author.query.all() book = Book.query.all() form = Append() return render_template('index.html',author=author,book=book,form=form)

模板中代码

{{ form.csrf_token }} 作者:{{ form.au_info }} 书名:{{ form.bk_info }} {{ form.submit }} (3)表单验证 @app.route('/', methods=['get', 'post']) def index(): append_form = Append() if requesthod == 'POST': if append_form.validate_on_submit(): author_name = append_form.au_info.data book_name = append_form.bk_info.data # 判断数据是否存在 author = Author.query.filter_by(name=author_name).first() if not author: try: # 先添加作者 author = Author(name=author_name) db.session.add(author) db.session.commit() # 再添加书籍 book = Book(name=book_name, author_id=author.id) db.session.add(book) db.session.commit() except Exception as e: db.session.rollback() print e flash("数据添加错误") else: book_names = [book.name for book in author.books] if book_name in book_names: flash('该作者已存在相同的书名') else: try: book = Book(name=book_name, author_id=author.id) db.session.add(book) db.session.commit() except Exception as e: db.session.rollback() print e flash('数据添加错误') else: flash('数据输入有问题') authors = Author.query.all() books = Book.query.all() return render_template('test2.html', authors=authors, books=books, append_form=append_form)

重新编写html文件展示列表书籍

书籍列表 {% for author in authors %} {{ author.name }} {% for book in author.books %} {{ book.name }} {% else %} 无书籍 {% endfor %} {% endfor %} (4)删除数据

定义删除author和book的路由

# 删除作者 @app.route('/delete_author/') def delete_author(author_id): author = Author.query.get(author_id) if not author: flash('数据不存在') else: try: Book.query.filter_by(author_id=author_id).delete() db.session.delete(author) db.session.commit() except Exception as e: db.session.rollback() print e flash('操作数据库失败') return redirect(url_for('index')) # 删除书籍 @app.route('/delete_book/') def delete_book(book_id): book = Book.query.get(book_id) if not book: flash('数据不存在') else: try: db.session.delete(book) db.session.commit() except Exception as e: db.session.rollback() print e flash('操作数据库失败') return redirect(url_for('index'))

在模版中添加删除的 a 标签链接

书籍列表 {% for author in authors %} {{ author.name }} 删除 {% for book in author.books %} {{ book.name }} 删除 {% else %} 无书籍 {% endfor %} {% endfor %} 7、数据库迁移 在开发过程中,需要修改数据库模型,而且还要在修改之后更新数据库。最直接的方式就是删除旧表,但这样会丢失数据。 更好的解决办法是使用数据库迁移框架,它可以追踪数据库模式的变化,然后把变动应用到数据库中。 在Flask中可以使用Flask-Migrate扩展,来实现数据迁移。并且集成到Flask-Script中,所有操作通过命令就能完成。 为了导出数据库迁移命令,Flask-Migrate提供了一个MigrateCommand类,可以附加到flask-script的manager对象上。 # 安装Flask-Migrate。 pip install flask-migrate # 代码如下 #coding=utf-8 from flask import Flask from flask_sqlalchemy import SQLAlchemy from flask_migrate import Migrate,MigrateCommand from flask_script import Shell,Manager app = Flask(__name__) manager = Manager(app) app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:[email protected]:3306/test' app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True db = SQLAlchemy(app) #第一个参数是Flask的实例,第二个参数是Sqlalchemy数据库实例 migrate = Migrate(app,db) #manager是Flask-Script的实例,这条语句在flask-Script中添加一个db命令 manager.add_command('db',MigrateCommand) #定义模型Role class Role(db.Model): # 定义表名 __tablename__ = 'roles' # 定义列对象 id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True) user = db.relationship('User', backref='role') #repr()方法显示一个可读字符串, def __repr__(self): return 'Role:'.format(self.name) #定义用户 class User(db.Model): __talbe__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(64), unique=True, index=True) #设置外键 role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) def __repr__(self): return 'User:'.format(self.username) if __name__ == '__main__': manager.run() 创建迁移仓库 #这个命令会创建migrations文件夹,所有迁移文件都放在里面。 python database.py db init 创建迁移脚本 自动创建迁移脚本有两个函数 upgrade():函数把迁移中的改动应用到数据库中。 downgrade():函数则将改动删除。 自动创建的迁移脚本会根据模型定义和数据库当前状态的差异,生成upgrade()和downgrade()函数的内容。 对比不一定完全正确,有可能会遗漏一些细节,需要进行检查 python database.py db migrate -m 'initial migration' 更新数据库 python database.py db upgrade 返回以前的版本

可以根据history命令找到版本号,然后传给downgrade命令:

python app.py db history 输出格式: -> 版本号 (head), initial migration

回滚到指定版本

python app.py db downgrade 版本号 实际操作顺序: 1.python 文件 db init 2.python 文件 db migrate -m"版本名(注释)" 3.python 文件 db upgrade 然后观察表结构 4.根据需求修改模型 5.python 文件 db migrate -m"新版本名(注释)" 6.python 文件 db upgrade 然后观察表结构 7.若返回版本,则利用 python 文件 db history查看版本号 8.python 文件 db downgrade(upgrade) 版本号 8、信号机制 Flask信号机制 Flask信号(signals, or event hooking)允许特定的发送端通知订阅者发生了什么(既然知道发生了什么,那我们可以根据自己业务需求实现自己的逻辑)。 Flask提供了一些信号(核心信号)且其它的扩展提供更多的信号。 信号依赖于Blinker库。 pip install blinker

flask内置信号列表:http://docs.jinkan.org/docs/flask/api.html#id17

template_rendered = _signals.signal('template-rendered') request_started = _signals.signal('request-started') request_finished = _signals.signal('request-finished') request_tearing_down = _signals.signal('request-tearing-down') got_request_exception = _signals.signal('got-request-exception') appcontext_tearing_down = _signals.signal('appcontext-tearing-down') appcontext_pushed = _signals.signal('appcontext-pushed') appcontext_popped = _signals.signal('appcontext-popped') message_flashed = _signals.signal('message-flashed') 信号应用场景

Flask-User 这个扩展中定义了名为 user_logged_in 的信号,当用户成功登入之后,这个信号会被发送。我们可以订阅该信号去追踪登录次数和登录IP:

from flask import request from flask_user.signals import user_logged_in @user_logged_in.connect_via(app) def track_logins(sender, user, **extra): user.login_count += 1 user.last_login_ip = request.remote_addr db.session.add(user) db.session.commit() Flask-SQLAlchemy 信号支持

在 Flask-SQLAlchemy 模块中,0.10 版本开始支持信号,可以连接到信号来获取到底发生什么了的通知。存在于下面两个信号:

models_committed 这个信号在修改的模型提交到数据库时发出。发送者是发送修改的应用,模型 和 操作描述符 以 (model, operation) 形式作为元组,这样的元组列表传递给接受者的 changes 参数。 该模型是发送到数据库的模型实例,当一个模型已经插入,操作是 'insert' ,而已删除是 'delete' ,如果更新了任何列,会是 'update' 。 before_models_committed 除了刚好在提交发送前发生,与 models_committed 完全相同。 from flask_sqlalchemy import models_committed # 给 models_committed 信号添加一个订阅者,即为当前 app @models_committed.connect_via(app) def models_committed(a, changes): print(a, changes) 9、常见关系模板代码

以下罗列了使用关系型数据库中常见关系定义模板代码

一对多 示例场景: 用户与其发布的帖子(用户表与帖子表) 角色与所属于该角色的用户(角色表与多用户表) 示例代码 class Role(db.Model): """角色表""" __tablename__ = 'roles' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True) users = db.relationship('User', backref='role', lazy='dynamic') class User(db.Model): """用户表""" __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True, index=True) 多对多 示例场景 讲师与其上课的班级(讲师表与班级表) 用户与其收藏的新闻(用户表与新闻表) 学生与其选修的课程(学生表与选修课程表) 示例代码 tb_student_course = db.Table('tb_student_course', db.Column('student_id', db.Integer, db.ForeignKey('students.id')), db.Column('course_id', db.Integer, db.ForeignKey('courses.id')) ) class Student(db.Model): __tablename__ = "students" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True) courses = db.relationship('Course', secondary=tb_student_course, backref=db.backref('students', lazy='dynamic'), lazy='dynamic') class Course(db.Model): __tablename__ = "courses" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True) 自关联一对多 示例场景 评论与该评论的子评论(评论表) 参考网易新闻 示例代码 class Comment(db.Model): """评论""" __tablename__ = "comments" id = db.Column(db.Integer, primary_key=True) # 评论内容 content = db.Column(db.Text, nullable=False) # 父评论id parent_id = db.Column(db.Integer, db.ForeignKey("comments.id")) # 父评论(也是评论模型) parent = db.relationship("Comment", remote_side=[id], backref=db.backref('childs', lazy='dynamic')) # 测试代码 if __name__ == '__main__': db.drop_all() db.create_all() com1 = Comment(content='我是主评论1') com2 = Comment(content='我是主评论2') com11 = Comment(content='我是回复主评论1的子评论1') com11.parent = com1 com12 = Comment(content='我是回复主评论1的子评论2') com12.parent = com1 db.session.add_all([com1, com2, com11, com12]) db.session.commit() app.run(debug=True) 自关联多对多

示例场景

用户关注其他用户(用户表,中间表)

示例代码

tb_user_follows = db.Table( "tb_user_follows", db.Column('follower_id', db.Integer, db.ForeignKey('info_user.id'), primary_key=True), # 粉丝id db.Column('followed_id', db.Integer, db.ForeignKey('info_user.id'), primary_key=True) # 被关注人的id ) class User(db.Model): """用户表""" __tablename__ = "info_user" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(32), unique=True, nullable=False) # 用户所有的粉丝,添加了反向引用followed,代表用户都关注了哪些人 followers = db.relationship('User', secondary=tb_user_follows, primaryjoin=id == tb_user_follows.c.followed_id, secondaryjoin=id == tb_user_follows.c.follower_id, backref=db.backref('followed', lazy='dynamic'), lazy='dynamic')

 

 


【本文地址】


今日新闻


推荐新闻


    CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3